Skip to content

motorhead.aggregation

AggregationStage = Literal['$addFields', '$bucket', '$bucketAuto', '$changeStream', '$changeStreamSplitLargeEvent', '$collStats', '$count', '$currentOp', '$densify', '$documents', '$facet', '$fill', '$geoNear', '$graphLookup', '$group', '$indexStats', '$limit', '$listLocalSessions', '$listSampledQueries', '$listSearchIndexes', '$listSessions', '$lookup', '$match', '$merge', '$out', '$planCacheStats', '$project', '$querySettings', '$redact', '$replaceRoot', '$replaceWith', '$sample', '$search', '$searchMeta', '$set', '$setWindowFields', '$shardedDataDistribution', '$skip', '$sort', '$sortByCount', '$unionWith', '$unset', '$unwind', '$vectorSearch'] module-attribute

Aggregation pipeline stage.

Aggregation

Bases: list[dict[str, AggregationData]]

Aggregation pipeline.

Source code in motorhead/aggregation.py
class Aggregation(list[dict[str, AggregationData]]):
    """Aggregation pipeline."""

    def __init__(self, stages: Iterable[AggregationData] = ()) -> None:
        """
        Initialization.

        Arguments:
            stages: The aggregation pipeline stages.
        """
        super().__init__(stages)

    def stage(self, stage: AggregationStage, value: AggregationData | Clause) -> Self:
        """
        Adds the stage to the aggregation pipeline.

        Arguments:
            stage: The stage operator.
            value: The stage operator's content.

        Returns:
            The aggregation pipeline.
        """
        self.append(make_aggregation_stage(stage, value))
        return self

__init__(stages=())

Initialization.

Parameters:

Name Type Description Default
stages Iterable[AggregationData]

The aggregation pipeline stages.

()
Source code in motorhead/aggregation.py
def __init__(self, stages: Iterable[AggregationData] = ()) -> None:
    """
    Initialization.

    Arguments:
        stages: The aggregation pipeline stages.
    """
    super().__init__(stages)

stage(stage, value)

Adds the stage to the aggregation pipeline.

Parameters:

Name Type Description Default
stage AggregationStage

The stage operator.

required
value AggregationData | Clause

The stage operator's content.

required

Returns:

Type Description
Self

The aggregation pipeline.

Source code in motorhead/aggregation.py
def stage(self, stage: AggregationStage, value: AggregationData | Clause) -> Self:
    """
    Adds the stage to the aggregation pipeline.

    Arguments:
        stage: The stage operator.
        value: The stage operator's content.

    Returns:
        The aggregation pipeline.
    """
    self.append(make_aggregation_stage(stage, value))
    return self

make_aggregation_stage(stage, value)

Creates an aggregation pipeline stage.

Parameters:

Name Type Description Default
stage AggregationStage

The stage operator.

required
value AggregationData | Clause

The stage operator's content.

required

Returns:

Type Description
dict[str, AggregationData]

The aggregation pipeline stage.

Source code in motorhead/aggregation.py
def make_aggregation_stage(
    stage: AggregationStage, value: AggregationData | Clause
) -> dict[str, AggregationData]:
    """
    Creates an aggregation pipeline stage.

    Arguments:
        stage: The stage operator.
        value: The stage operator's content.

    Returns:
        The aggregation pipeline stage.
    """
    return {stage: value.to_mongo() if hasattr(value, "to_mongo") else value}