GsIterator#

class mobgap.pipeline.GsIterator(
data_type: type[FullPipelinePerGsResult] = ...,
aggregations: Sequence[tuple[str, Callable[[list[IteratorResult]], Any]]] = ...,
)[source]#
class mobgap.pipeline.GsIterator(
data_type: type[DataclassT] = ...,
aggregations: Sequence[tuple[str, Callable[[list[IteratorResult]], Any]]] = ...,
)

Iterator to split data into gait-sequences and iterate over them individually.

This can be used to easily iterate over gait-sequences and apply algorithms to them, and then collect the results in a convenient way.

Note that you need to specify the expected results by creating a custom dataclass (learn more in the example linked at the bottom of this page). Each result can further be aggregated by providing an aggregation function.

Parameters:
data_type

A dataclass that defines the result type you expect from each iteration. By default, this is GsIterator.DEFAULT_DATA_TYPE, which should handle all typical results of a gait analysis pipeline.

aggregations

An optional list of aggregations to apply to the results. This has the form [(result_name, aggregation_function), ...]. Each aggregation function gets raw_results_ provided as input and can return an arbitrary object. If a result-name is in the list, the aggregation will be applied to it, when accessing the results_ (i.e. results_.{result_name}). If no aggregation is defined for a result, a simple list of all results will be returned. By default, this is GsIterator.DEFAULT_AGGREGATIONS.

NULL_VALUE

(Class attribute) The value that is used to initialize the result dataclass and will remain in the results, if no result was provided for a specific attribute in one or more iterations.

PredefinedParameters

(Class attribute) Predefined parameters that can be used depending on which aggregation you want to use. In all provided cases the data_type is set to FullPipelinePerGsResult. This datatype provides the following attributes:

  • ic_list (pd.DataFrame with a column called ic): The initial contacts for each gait-sequence.

  • cad_per_sec (pd.DataFrame): The cadence values within each gait-sequence.

  • stride_length (pd.DataFrame): The stride length values within each gait-sequence.

  • gait_speed (pd.DataFrame): The gait speed values within each gait-sequence.

DefaultAggregators

(Class attribute) Class that holds some aggregator functions that can be used to create custom aggregations.

IteratorResult

(Class attribute) Type alias for the result-type of the iterator. raw_results_ will be a list of these. Note, that when using this outside of the class, this type will be a generic without a type for the result field. You need to bind it as GsIterator.IteratorResult[MyCustomResultType] to get the correct type. This will then be the correct result type of an iterator using the same data_type (i.e. gs_iterator = GsIterator[MyCustomResultType](MyCustomResultType)).

Attributes:
results_

The aggregated results.

raw_results_

# TODO: Adapt path once tpcp PR is merged List of all results as TypedIteratorResultTuple instances. This is the input to the aggregation functions. The attribute of the result dataclass instance will have the value of _NOT_SET if no result was set. To check for this, you can use isinstance(val, GsIterator.NULL_VALUE) or the GsIterator.filter_iterator_results method to remove all results with a NULL_VALUE.

done_

A dictionary indicating of a specific iterator is done. This can have the keys __main__ or __sub_iter for the main iteration triggered by iterate or sub-iterations triggered by iterate_subregions or with_subregion. The value will be True if the respective iteration is done, False if it is currently running and missing if it was never started. If the main iterator is not done, but you try to access the results, an error will be raised.

See also

tpcp.misb.BaseTypedIterator

Baseclass of this iterator

tpcp.misc.TypedIterator

Generic version of this iterator

iter_gs

Functional interface to iterate over gs.

Methods

DefaultAggregators()

Available aggregators for the gait-sequence iterator.

PredefinedParameters()

Predefined parameters for the gait-sequence iterator.

clone()

Create a new instance of the class with all parameters copied over.

get_params([deep])

Get parameters for this algorithm.

iterate(data, region_list)

Iterate over the gait sequences one by one.

iterate_subregions(sub_region_list)

Iterate subregions within the current gait sequence.

set_params(**params)

Set the parameters of this Algorithm.

subregion(sub_region_list)

Context manager for handling a subregion of the current gait sequence.

with_subregion(sub_region_list)

Get a subregion of the current gait sequence.

IteratorResult

filter_iterator_results

__init__(data_type: type[FullPipelinePerGsResult] = <class 'mobgap.pipeline._gs_iterator.FullPipelinePerGsResult'>, aggregations: Sequence[tuple[str, Callable[[list[IteratorResult]], Any]]] = cf([('ic_list', <function create_aggregate_df.<locals>.aggregate_df>), ('turn_list', <function create_aggregate_df.<locals>.aggregate_df>), ('cadence_per_sec', <function create_aggregate_df.<locals>.aggregate_df>), ('stride_length_per_sec', <function create_aggregate_df.<locals>.aggregate_df>), ('walking_speed_per_sec', <function create_aggregate_df.<locals>.aggregate_df>)])) None[source]#
__init__(data_type: type[DataclassT] = <class 'mobgap.pipeline._gs_iterator.FullPipelinePerGsResult'>, aggregations: Sequence[tuple[str, Callable[[list[IteratorResult]], Any]]] = cf([('ic_list', <function create_aggregate_df.<locals>.aggregate_df>), ('turn_list', <function create_aggregate_df.<locals>.aggregate_df>), ('cadence_per_sec', <function create_aggregate_df.<locals>.aggregate_df>), ('stride_length_per_sec', <function create_aggregate_df.<locals>.aggregate_df>), ('walking_speed_per_sec', <function create_aggregate_df.<locals>.aggregate_df>)])) None
class DefaultAggregators[source]#

Available aggregators for the gait-sequence iterator.

Note, that all of them are constructors for aggregators, as they have some configuration options. To use them as aggregators, you need to call them with the desired configuration.

Examples

>>> from mobgap.pipeline import GsIterator
>>> my_aggregation = [
...     (
...         "my_result",
...         GsIterator.DefaultAggregators.create_aggregate_df(
...             "my_result", fix_offset_cols=["my_col"]
...         ),
...     )
... ]
>>> iterator = GsIterator(aggregations=my_aggregation)

Methods

create_aggregate_df([fix_offset_cols, ...])

Create an aggregator for the GS iterator that aggregates dataframe results into a single dataframe.

create_aggregate_df(
fix_offset_cols: Sequence[str] = ('start', 'end'),
*,
fix_offset_index: bool = False,
_null_value: _NotSet = NOT_SET,
) Callable[[list[GsIterator.IteratorResult[Any]]], DataFrame][source]#

Create an aggregator for the GS iterator that aggregates dataframe results into a single dataframe.

The aggregator will also fix the offset of the given columns by adding the start value of the gait-sequence. This way the final dataframe will have all sample based time-values relative to the start of the recording.

Parameters:
result_name

The name of the result key within the result object, the aggregation is applied to

fix_offset_cols

The columns that should be adapted to be relative to the start of the recording. By default, this is ("start", "end"). If you don’t want to fix any columns, you can set this to an empty list.

fix_offset_index

If True, the index of the dataframes will be adapted to be relative to the start of the recording. This only makes sense, if the index represents sample values relative to the start of the gs.

_null_value

A fixed value that should indicate that no results were produced. You don’t need to change this, unless you are doing very advanced stuff.

Notes

Fixing the offset works by getting the start value of the gait-sequence and adding it to the respective columns. This is “easy” for the main iteration, where the gait-sequences contains all the relevant information. For sub-iteration, we need to consider the parent context. For this, the GS-Iterator, places the parent gait-sequence in the iteration context.

class PredefinedParameters[source]#

Predefined parameters for the gait-sequence iterator.

Attributes:
default_aggregation

The default of the TypedIterator using the FullPipelinePerGsResult as data_type and trying to aggregate all results so that the time values in the final outputs are relative to the start of the recording.

default_aggregation_rel_to_gs

Same as default_aggregation, but the time values in the final outputs are relative to the start of the respective gait-sequence (i.e. no modification of the time values is done).

clone() Self[source]#

Create a new instance of the class with all parameters copied over.

This will create a new instance of the class itself and all nested objects

get_params(deep: bool = True) dict[str, Any][source]#

Get parameters for this algorithm.

Parameters:
deep

Only relevant if object contains nested algorithm objects. If this is the case and deep is True, the params of these nested objects are included in the output using a prefix like nested_object_name__ (Note the two “_” at the end)

Returns:
params

Parameter names mapped to their values.

iterate(
data: DataFrame,
region_list: DataFrame,
) Iterator[tuple[tuple[Region, DataFrame], DataclassT]][source]#

Iterate over the gait sequences one by one.

Parameters:
data

The data to iterate over.

region_list

The list of gait-sequences. The “start” and “end” columns are expected to match the units of the data index.

Yields:
region_datatuple[Region, pd.DataFrame]

The data per gait-sequence. This is a tuple where the first element is a Region object that contains the relevant information about the current GS/WB/region and the second element is the data cut from the data dataframe.

result_object

The empty result object (instance of the provided Dataclass) that should be filled with the results during iteration.

iterate_subregions(
sub_region_list: DataFrame,
) Iterator[tuple[tuple[Region, DataFrame], DataclassT]][source]#

Iterate subregions within the current gait sequence.

This can be called within the for-loop created by the main iteration to trigger the iteration over subregions. The provided subregions are expected to be relative to the current gait-sequence. Working with subregions, can be a little tricky, and we recommend you read through the respective pipeline examples to avoid foot-guns.

Note

If you only have a single GS in your sub_region_list you can also use the with_subregion method and avoid creating a nested for-loop.

Parameters:
sub_region_list

The list of subregions within the current region. The “start” and “end” values need to be relative to the current gait sequence the parent is iterating over.

Returns:
region_datatuple[Region, pd.DataFrame]

The data per gait-sequence. This is a tuple where the first element is a Region object that contains the relevant information about the current GS/WB/region and the second element is the data cut from the data dataframe.

result_object

The empty result object (instance of the provided Dataclass) that should be filled with the results during iteration.

property results_: DataclassT#

The aggregated results.

Note, that this returns an instance of the result object, even-though the datatypes of the attributes might be different depending on the aggregation function. We still decided it makes sense to return an instance of the result object, as it will allow to autocomplete the attributes, even-though the associated times might not be correct.

set_params(**params: Any) Self[source]#

Set the parameters of this Algorithm.

To set parameters of nested objects use nested_object_name__para_name=.

subregion(
sub_region_list: DataFrame,
) Iterator[tuple[tuple[Region, DataFrame], DataclassT]][source]#

Context manager for handling a subregion of the current gait sequence.

This is basically just syntactic sugar for the with_subregion method. However, it also performs a check that you are only writing to the intended result object while within the with block, which hopefully prevents some mistakes.

Parameters:
sub_region_list

The list of subregions within the current gait-sequence. The “start” and “end” values need to be relative to the current gait sequence the parent is iterating over. For the with_subregions method this must be just a single GS. If you want to iterate multiple GSs see iterate_subregions.

Yields:
inputs

A tuple with a gait-sequence object and the data corresponding to the subregion.

result_object

An empty result object for the subregion that can be used to provide results for it.

Notes

Internally, this simply uses iterate_subregions, but completes the iteration over the single GS and returns it.

Examples

>>> gs_list = pd.DataFrame({"start": [0, 10, 20], "end": [10, 20, 30]}).rename_axis(
...     "gs_id"
... )
>>> gs_iterator = GsIterator()
>>> for (gs, data), r in gs_iterator.iterate(data, gs_list):
...     sub_region = pd.DataFrame(
...         {"start": [3], "end": [len(data) - 3]}
...     ).rename_axis("gs_id")
...     with gs_iterator.subregion(sub_region) as ((sub_gs, sub_data), sub_r):
...         # Do something with the subregion data
...         sub_r.my_result = pd.DataFrame({"my_col": [1, 2, 3]})
with_subregion(
sub_region_list: DataFrame,
) tuple[tuple[Region, DataFrame], DataclassT][source]#

Get a subregion of the current gait sequence.

For details see iterate_subregions.

Parameters:
sub_region_list

A region list containing a SINGLE subregion (i.e. on row) within the current region. The “start” and “end” values need to be relative to the current gait sequence the parent is iterating over. For the with_subregions method this must be just a single GS. If you want to iterate multiple GSs see iterate_subregions.

Returns:
inputs

A tuple with a gait-sequence object and the data corresponding to the subregion.

result_object

An empty result object for the subregion that can be used to provide results for it.

Notes

Internally, this simply uses iterate_subregions, but completes the iteration over the single GS and returns it.

Examples

>>> gs_list = pd.DataFrame({"start": [0, 10, 20], "end": [10, 20, 30]}).rename_axis(
...     "gs_id"
... )
>>> gs_iterator = GsIterator()
>>> for (gs, data), r in gs_iterator.iterate(data, gs_list):
...     sub_region = pd.DataFrame(
...         {"start": [3], "end": [len(data) - 3]}
...     ).rename_axis("gs_id")
...     (sub_gs, sub_data), sub_r = gs_iterator.with_subregion(sub_region)
...     # Do something with the subregion data
...     sub_r.my_result = pd.DataFrame({"my_col": [1, 2, 3]})

Examples using mobgap.pipeline.GsIterator#

Working with reference data

Working with reference data

Gait Sequence Iterator

Gait Sequence Iterator

The Mobilise-D pipeline: Step-by-Step Breakdown

The Mobilise-D pipeline: Step-by-Step Breakdown

ICD Ionescu

ICD Ionescu

Shin Algo

Shin Algo

HKLee algo

HKLee algo

ICD Evaluation

ICD Evaluation

McCamley L/R Classifier

McCamley L/R Classifier

Ullrich L/R Classifier

Ullrich L/R Classifier

LRC Evaluation

LRC Evaluation

Cadence Evaluation

Cadence Evaluation

Stride Length Evaluation

Stride Length Evaluation

ElGohary Turning Algo

ElGohary Turning Algo