GsIterator#
- class mobgap.pipeline.GsIterator(
- data_type: type[FullPipelinePerGsResult] = ...,
- aggregations: Sequence[tuple[str, Callable[[list[IteratorResult]], Any]]] = ...,
- class mobgap.pipeline.GsIterator(
- data_type: type[DataclassT] = ...,
- aggregations: Sequence[tuple[str, Callable[[list[IteratorResult]], Any]]] = ...,
Iterator to split data into gait-sequences and iterate over them individually.
This can be used to easily iterate over gait-sequences and apply algorithms to them, and then collect the results in a convenient way.
Note that you need to specify the expected results by creating a custom dataclass (learn more in the example linked at the bottom of this page). Each result can further be aggregated by providing an aggregation function.
- Parameters:
- data_type
A dataclass that defines the result type you expect from each iteration. By default, this is
GsIterator.DEFAULT_DATA_TYPE, which should handle all typical results of a gait analysis pipeline.- aggregations
An optional list of aggregations to apply to the results. This has the form
[(result_name, aggregation_function), ...]. Each aggregation function getsraw_results_provided as input and can return an arbitrary object. If a result-name is in the list, the aggregation will be applied to it, when accessing theresults_(i.e.results_.{result_name}). If no aggregation is defined for a result, a simple list of all results will be returned. By default, this isGsIterator.DEFAULT_AGGREGATIONS.- NULL_VALUE
(Class attribute) The value that is used to initialize the result dataclass and will remain in the results, if no result was provided for a specific attribute in one or more iterations.
- PredefinedParameters
(Class attribute) Predefined parameters that can be used depending on which aggregation you want to use. In all provided cases the
data_typeis set toFullPipelinePerGsResult. This datatype provides the following attributes:ic_list(pd.DataFrame with a column calledic): The initial contacts for each gait-sequence.cad_per_sec(pd.DataFrame): The cadence values within each gait-sequence.stride_length(pd.DataFrame): The stride length values within each gait-sequence.gait_speed(pd.DataFrame): The gait speed values within each gait-sequence.
- DefaultAggregators
(Class attribute) Class that holds some aggregator functions that can be used to create custom aggregations.
- IteratorResult
(Class attribute) Type alias for the result-type of the iterator.
raw_results_will be a list of these. Note, that when using this outside of the class, this type will be a generic without a type for theresultfield. You need to bind it asGsIterator.IteratorResult[MyCustomResultType]to get the correct type. This will then be the correct result type of an iterator using the samedata_type(i.e.gs_iterator = GsIterator[MyCustomResultType](MyCustomResultType)).
- Attributes:
results_The aggregated results.
- raw_results_
# TODO: Adapt path once tpcp PR is merged List of all results as
TypedIteratorResultTupleinstances. This is the input to the aggregation functions. The attribute of theresultdataclass instance will have the value of_NOT_SETif no result was set. To check for this, you can useisinstance(val, GsIterator.NULL_VALUE)or theGsIterator.filter_iterator_resultsmethod to remove all results with aNULL_VALUE.- done_
A dictionary indicating of a specific iterator is done. This can have the keys
__main__or__sub_iterfor the main iteration triggered byiterateor sub-iterations triggered byiterate_subregionsorwith_subregion. The value will beTrueif the respective iteration is done,Falseif it is currently running and missing if it was never started. If the main iterator is not done, but you try to access the results, an error will be raised.
See also
tpcp.misb.BaseTypedIteratorBaseclass of this iterator
tpcp.misc.TypedIteratorGeneric version of this iterator
iter_gsFunctional interface to iterate over gs.
Methods
Available aggregators for the gait-sequence iterator.
Predefined parameters for the gait-sequence iterator.
clone()Create a new instance of the class with all parameters copied over.
get_params([deep])Get parameters for this algorithm.
iterate(data, region_list)Iterate over the gait sequences one by one.
iterate_subregions(sub_region_list)Iterate subregions within the current gait sequence.
set_params(**params)Set the parameters of this Algorithm.
subregion(sub_region_list)Context manager for handling a subregion of the current gait sequence.
with_subregion(sub_region_list)Get a subregion of the current gait sequence.
IteratorResult
filter_iterator_results
- __init__(data_type: type[FullPipelinePerGsResult] = <class 'mobgap.pipeline._gs_iterator.FullPipelinePerGsResult'>, aggregations: Sequence[tuple[str, Callable[[list[IteratorResult]], Any]]] = cf([('ic_list', <function create_aggregate_df.<locals>.aggregate_df>), ('turn_list', <function create_aggregate_df.<locals>.aggregate_df>), ('cadence_per_sec', <function create_aggregate_df.<locals>.aggregate_df>), ('stride_length_per_sec', <function create_aggregate_df.<locals>.aggregate_df>), ('walking_speed_per_sec', <function create_aggregate_df.<locals>.aggregate_df>)])) None[source]#
- __init__(data_type: type[DataclassT] = <class 'mobgap.pipeline._gs_iterator.FullPipelinePerGsResult'>, aggregations: Sequence[tuple[str, Callable[[list[IteratorResult]], Any]]] = cf([('ic_list', <function create_aggregate_df.<locals>.aggregate_df>), ('turn_list', <function create_aggregate_df.<locals>.aggregate_df>), ('cadence_per_sec', <function create_aggregate_df.<locals>.aggregate_df>), ('stride_length_per_sec', <function create_aggregate_df.<locals>.aggregate_df>), ('walking_speed_per_sec', <function create_aggregate_df.<locals>.aggregate_df>)])) None
- class DefaultAggregators[source]#
Available aggregators for the gait-sequence iterator.
Note, that all of them are constructors for aggregators, as they have some configuration options. To use them as aggregators, you need to call them with the desired configuration.
Examples
>>> from mobgap.pipeline import GsIterator >>> my_aggregation = [ ... ( ... "my_result", ... GsIterator.DefaultAggregators.create_aggregate_df( ... "my_result", fix_offset_cols=["my_col"] ... ), ... ) ... ] >>> iterator = GsIterator(aggregations=my_aggregation)
Methods
create_aggregate_df([fix_offset_cols, ...])Create an aggregator for the GS iterator that aggregates dataframe results into a single dataframe.
- create_aggregate_df(
- fix_offset_cols: Sequence[str] = ('start', 'end'),
- *,
- fix_offset_index: bool = False,
- _null_value: _NotSet = NOT_SET,
Create an aggregator for the GS iterator that aggregates dataframe results into a single dataframe.
The aggregator will also fix the offset of the given columns by adding the start value of the gait-sequence. This way the final dataframe will have all sample based time-values relative to the start of the recording.
- Parameters:
- result_name
The name of the result key within the result object, the aggregation is applied to
- fix_offset_cols
The columns that should be adapted to be relative to the start of the recording. By default, this is
("start", "end"). If you don’t want to fix any columns, you can set this to an empty list.- fix_offset_index
If True, the index of the dataframes will be adapted to be relative to the start of the recording. This only makes sense, if the index represents sample values relative to the start of the gs.
- _null_value
A fixed value that should indicate that no results were produced. You don’t need to change this, unless you are doing very advanced stuff.
Notes
Fixing the offset works by getting the start value of the gait-sequence and adding it to the respective columns. This is “easy” for the main iteration, where the gait-sequences contains all the relevant information. For sub-iteration, we need to consider the parent context. For this, the GS-Iterator, places the parent gait-sequence in the iteration context.
- class PredefinedParameters[source]#
Predefined parameters for the gait-sequence iterator.
- Attributes:
- default_aggregation
The default of the TypedIterator using the
FullPipelinePerGsResultas data_type and trying to aggregate all results so that the time values in the final outputs are relative to the start of the recording.- default_aggregation_rel_to_gs
Same as
default_aggregation, but the time values in the final outputs are relative to the start of the respective gait-sequence (i.e. no modification of the time values is done).
- clone() Self[source]#
Create a new instance of the class with all parameters copied over.
This will create a new instance of the class itself and all nested objects
- get_params(deep: bool = True) dict[str, Any][source]#
Get parameters for this algorithm.
- Parameters:
- deep
Only relevant if object contains nested algorithm objects. If this is the case and deep is True, the params of these nested objects are included in the output using a prefix like
nested_object_name__(Note the two “_” at the end)
- Returns:
- params
Parameter names mapped to their values.
- iterate( ) Iterator[tuple[tuple[Region, DataFrame], DataclassT]][source]#
Iterate over the gait sequences one by one.
- Parameters:
- data
The data to iterate over.
- region_list
The list of gait-sequences. The “start” and “end” columns are expected to match the units of the data index.
- Yields:
- region_datatuple[Region, pd.DataFrame]
The data per gait-sequence. This is a tuple where the first element is a
Regionobject that contains the relevant information about the current GS/WB/region and the second element is the data cut from the data dataframe.- result_object
The empty result object (instance of the provided Dataclass) that should be filled with the results during iteration.
- iterate_subregions(
- sub_region_list: DataFrame,
Iterate subregions within the current gait sequence.
This can be called within the for-loop created by the main iteration to trigger the iteration over subregions. The provided subregions are expected to be relative to the current gait-sequence. Working with subregions, can be a little tricky, and we recommend you read through the respective pipeline examples to avoid foot-guns.
Note
If you only have a single GS in your
sub_region_listyou can also use thewith_subregionmethod and avoid creating a nested for-loop.- Parameters:
- sub_region_list
The list of subregions within the current region. The “start” and “end” values need to be relative to the current gait sequence the parent is iterating over.
- Returns:
- region_datatuple[Region, pd.DataFrame]
The data per gait-sequence. This is a tuple where the first element is a
Regionobject that contains the relevant information about the current GS/WB/region and the second element is the data cut from the data dataframe.- result_object
The empty result object (instance of the provided Dataclass) that should be filled with the results during iteration.
- property results_: DataclassT#
The aggregated results.
Note, that this returns an instance of the result object, even-though the datatypes of the attributes might be different depending on the aggregation function. We still decided it makes sense to return an instance of the result object, as it will allow to autocomplete the attributes, even-though the associated times might not be correct.
- set_params(**params: Any) Self[source]#
Set the parameters of this Algorithm.
To set parameters of nested objects use
nested_object_name__para_name=.
- subregion(
- sub_region_list: DataFrame,
Context manager for handling a subregion of the current gait sequence.
This is basically just syntactic sugar for the
with_subregionmethod. However, it also performs a check that you are only writing to the intended result object while within thewithblock, which hopefully prevents some mistakes.- Parameters:
- sub_region_list
The list of subregions within the current gait-sequence. The “start” and “end” values need to be relative to the current gait sequence the parent is iterating over. For the
with_subregionsmethod this must be just a single GS. If you want to iterate multiple GSs seeiterate_subregions.
- Yields:
- inputs
A tuple with a gait-sequence object and the data corresponding to the subregion.
- result_object
An empty result object for the subregion that can be used to provide results for it.
Notes
Internally, this simply uses
iterate_subregions, but completes the iteration over the single GS and returns it.Examples
>>> gs_list = pd.DataFrame({"start": [0, 10, 20], "end": [10, 20, 30]}).rename_axis( ... "gs_id" ... ) >>> gs_iterator = GsIterator() >>> for (gs, data), r in gs_iterator.iterate(data, gs_list): ... sub_region = pd.DataFrame( ... {"start": [3], "end": [len(data) - 3]} ... ).rename_axis("gs_id") ... with gs_iterator.subregion(sub_region) as ((sub_gs, sub_data), sub_r): ... # Do something with the subregion data ... sub_r.my_result = pd.DataFrame({"my_col": [1, 2, 3]})
- with_subregion(
- sub_region_list: DataFrame,
Get a subregion of the current gait sequence.
For details see
iterate_subregions.- Parameters:
- sub_region_list
A region list containing a SINGLE subregion (i.e. on row) within the current region. The “start” and “end” values need to be relative to the current gait sequence the parent is iterating over. For the
with_subregionsmethod this must be just a single GS. If you want to iterate multiple GSs seeiterate_subregions.
- Returns:
- inputs
A tuple with a gait-sequence object and the data corresponding to the subregion.
- result_object
An empty result object for the subregion that can be used to provide results for it.
Notes
Internally, this simply uses
iterate_subregions, but completes the iteration over the single GS and returns it.Examples
>>> gs_list = pd.DataFrame({"start": [0, 10, 20], "end": [10, 20, 30]}).rename_axis( ... "gs_id" ... ) >>> gs_iterator = GsIterator() >>> for (gs, data), r in gs_iterator.iterate(data, gs_list): ... sub_region = pd.DataFrame( ... {"start": [3], "end": [len(data) - 3]} ... ).rename_axis("gs_id") ... (sub_gs, sub_data), sub_r = gs_iterator.with_subregion(sub_region) ... # Do something with the subregion data ... sub_r.my_result = pd.DataFrame({"my_col": [1, 2, 3]})