neosqlite.collection.query_helper.aggregation module

Aggregation pipeline methods for NeoSQLite.

This module contains the AggregationMixin class, which provides methods for building and executing MongoDB-like aggregation pipelines using SQL.

class neosqlite.collection.query_helper.aggregation.AggregationMixin[source]

Bases: object

Mixin class providing aggregation pipeline methods.

This mixin assumes it will be used with a class that has the following:

self.collection

A collection instance with: - db: Database connection - name: Collection name - _load: Method to load documents - _get_val: Method to get values from documents - _set_val: Method to set values in documents

self._jsonb_supported

Whether JSONB is supported

self._json_function_prefix

“json” or “jsonb”

self._json_each_function

“json_each” or “jsonb_each”

self._build_simple_where_clause

Method to build WHERE clauses

self._reorder_pipeline_for_indexes

Method to reorder pipelines

self._estimate_pipeline_cost

Method to estimate costs

self._optimize_match_pushdown

Method to optimize match pushdown

self._is_datetime_indexed_field

Method to check datetime indexes

self._build_group_query

Method to build group queries

self._apply_query

Method to apply queries to documents

collection: Collection
_jsonb_supported: bool
_json_function_prefix: str
_json_each_function: str
_build_simple_where_clause: Any
_reorder_pipeline_for_indexes: Any
_estimate_pipeline_cost: Any
_optimize_match_pushdown: Any
_is_datetime_indexed_field: Any
_apply_query: Any
_build_aggregation_query(pipeline: list[dict[str, Any]]) tuple[str, list[Any], list[str] | None] | None[source]

Builds a SQL query for the given MongoDB-like aggregation pipeline.

This method constructs a SQL query based on the stages provided in the aggregation pipeline. It currently handles $match, $sort, $skip, and $limit stages, while $group stages are handled in Python. The method returns a tuple containing the SQL command and a list of parameters.

Parameters:

pipeline (list[dict[str, Any]]) – A list of aggregation pipeline stages.

Returns:

A tuple containing the SQL command and

a list of parameters, or None if the pipeline contains unsupported stages or complex queries.

Return type:

tuple[str, list[Any]] | None

_optimize_unwind_group_pattern(group_stage_index: int, pipeline: list[dict[str, Any]]) tuple[str, list[Any], list[str]] | None[source]

Optimize $unwind + $group pattern with SQL-based processing.

This method handles the specific optimization pattern where a $unwind stage is immediately followed by a $group stage. It supports all accumulator operations by leveraging the general _build_group_query method while handling the $unwind optimization.

Parameters:
  • group_stage_index – Index of the $group stage in the pipeline

  • pipeline – The complete aggregation pipeline

Returns:

SQL command, params, and output fields if optimization is possible, None otherwise

Return type:

tuple[str, list[Any], list[str]] | None

_build_unwind_query(pipeline_index: int, pipeline: list[dict[str, Any]], unwind_stages: list[str]) tuple[str, list[Any], list[str] | None] | None[source]

Builds a SQL query for a sequence of $unwind stages.

This method constructs a SQL query to handle one or more consecutive $unwind stages in an aggregation pipeline. It processes array fields by joining with SQLite’s json_each/jsonb_each function to “unwind” the arrays into separate rows. The method also handles necessary array type checks and integrates with other pipeline stages like $match, $sort, $skip, and $limit.

Parameters:
  • pipeline_index (int) – The index of the first $unwind stage in the pipeline.

  • pipeline (list[dict[str, Any]]) – The full aggregation pipeline.

  • unwind_stages (list[str]) – A list of field paths to unwind, each prefixed with ‘$’.

Returns:

A tuple containing:
  • The constructed SQL command string.

  • A list of parameters for the SQL query.

  • A list of output field names (None if not applicable).

Returns None if the unwind stages cannot be processed with SQL and a fallback to Python is required.

Return type:

tuple[str, list[Any], list[str] | None] | None

_build_unwind_from_clause(field_names: list[str]) tuple[str, dict[str, str]][source]

Builds the FROM clause for a SQL query with one or more $unwind stages.

This method constructs the FROM clause needed to handle multiple $unwind operations in an aggregation pipeline. It creates joins with SQLite’s json_each/jsonb_each function for each field to be unwound, allowing array elements to be processed as separate rows. It also manages nested unwinds by identifying parent-child relationships between fields.

Parameters:

field_names (list[str]) – A list of field paths to unwind. Each path should be a string without the leading ‘$’.

Returns:

A tuple containing:
  • The constructed FROM clause as a string.

  • A dictionary mapping each unwound field path to its corresponding alias (e.g., ‘je1’, ‘je2’).

Return type:

tuple[str, dict[str, str]]

_build_unwind_from_clause_impl(field_names: list[str]) tuple[str, dict[str, str]][source]

Internal implementation for building the FROM clause.

Parameters:

field_names (list[str]) – A list of field paths to unwind.

Returns:

A tuple containing the FROM clause and

unwound fields mapping.

Return type:

tuple[str, dict[str, str]]

_find_parent_unwind(field_name: str, unwound_fields: dict[str, str]) tuple[str | None, str | None][source]

Find the parent unwind field for a nested unwind.

This method searches through already processed unwind fields to find a parent field that the current field is nested within. This is used to properly construct SQL joins for nested array unwinding operations.

Parameters:
  • field_name (str) – The field name to find the parent for.

  • unwound_fields (dict[str, str]) – A dictionary mapping field paths to their aliases.

Returns:

A tuple containing the parent field

name and its alias, or (None, None) if no parent is found.

Return type:

tuple[str | None, str | None]

_build_sort_skip_limit_clauses(pipeline: list[dict[str, Any]], start_index: int, end_index: int, unwound_fields: dict[str, str]) tuple[str, str, str][source]

Build ORDER BY, LIMIT, and OFFSET clauses for aggregation queries.

This method constructs the SQL clauses for sorting, skipping, and limiting results in an aggregation pipeline. It handles both regular fields and fields that have been unwound from arrays, ensuring proper SQL generation for nested array elements.

Parameters:
  • pipeline (list[dict[str, Any]]) – The aggregation pipeline stages.

  • start_index (int) – The starting index in the pipeline to process stages from.

  • end_index (int) – The ending index in the pipeline to process stages to.

  • unwound_fields (dict[str, str]) – A mapping of field names to their aliases for unwound fields.

Returns:

A tuple containing:
  • The ORDER BY clause (empty string if no sorting)

  • The LIMIT clause (empty string if no limit)

  • The OFFSET clause (empty string if no offset)

Return type:

tuple[str, str, str]

_build_group_query(group_spec: dict[str, Any]) tuple[str, str, list[str]] | None[source]

Builds the SELECT and GROUP BY clauses for a $group stage.

This method constructs SQL SELECT and GROUP BY clauses for MongoDB-like $group aggregation stages that can be handled directly with SQL. It supports grouping by a single field and various accumulator operations like $sum, $avg, $min, $max, $count, $push, and $addToSet.

Parameters:

group_spec (dict[str, Any]) – A dictionary representing the $group stage specification. It should contain an “_id” field for grouping and accumulator operations for other fields.

Returns:

A tuple containing:
  • The SELECT clause string with all required expressions

  • The GROUP BY clause string

  • A list of output field names

Returns None if the group specification contains unsupported operations that require Python-based processing.

Return type:

tuple[str, str, list[str]] | None

_process_group_stage(group_query: dict[str, Any], docs: list[dict[str, Any]]) list[dict[str, Any]][source]

Process the $group stage of an aggregation pipeline.

This method groups documents by a specified field and performs specified accumulator operations on other fields.

Parameters:
  • group_query (dict[str, Any]) – A dictionary representing the $group stage of the aggregation pipeline.

  • docs (list[dict[str, Any]]) – A list of documents to be grouped.

Returns:

A list of grouped documents with applied

accumulator operations.

Return type:

list[dict[str, Any]]

_run_subpipeline(sub_pipeline: list[dict[str, Any]], docs: list[dict[str, Any]], batch_size: int = 101) str[source]

Run a sub-pipeline (e.g., for $facet) on a list of documents.

Uses tier optimization (Tier-1/Tier-2/Tier-3) for each sub-pipeline. Results are streamed to a temporary table in batches to avoid memory issues.

Parameters:
  • sub_pipeline – List of pipeline stages to execute

  • docs – Input documents

  • batch_size – Number of documents to process in each batch

Returns:

Name of the temporary table containing results

_apply_projection(projection: dict[str, Any], document: dict[str, Any]) dict[str, Any][source]

Applies the projection to the document, selecting or excluding fields based on the projection criteria.

Parameters:
  • projection (dict[str, Any]) – A dictionary specifying which fields to include or exclude.

  • document (dict[str, Any]) – The document to apply the projection to.

Returns:

The document with fields applied based on the projection.

Return type:

dict[str, Any]