neosqlite.collection.sql_tier_aggregator module

SQL Tier 1 Optimizer for Aggregation Pipelines.

This module implements SQL-based optimization for aggregation pipelines, providing 10-100x performance improvements over Python fallback (Tier 3).

The optimizer analyzes aggregation pipelines and generates optimized SQL queries using CTEs (Common Table Expressions) for multi-stage pipelines.

class neosqlite.collection.sql_tier_aggregator.PipelineContext[source]

Bases: object

Tracks field aliases, computed fields, and document state across pipeline stages.

__init__() None[source]

Initialize pipeline context with default state.

add_computed_field(field: str, sql_expr: str) None[source]

Track a computed field.

remove_field(field: str) None[source]

Mark field as removed.

get_field_sql(field: str) str | None[source]

Get SQL expression for a field.

is_field_available(field: str) bool[source]

Check if field is available in current context.

is_field_computed(field: str) bool[source]

Check if field is a computed field.

preserve_root() None[source]

Mark that $$ROOT should be preserved.

needs_root() bool[source]

Check if $$ROOT is needed.

clone() PipelineContext[source]

Create a copy of this context.

class neosqlite.collection.sql_tier_aggregator.SQLTierAggregator(collection, expr_evaluator: ExprEvaluator | None = None, translation_cache_size: int | None = 100)[source]

Bases: object

SQL Tier 1 optimizer for aggregation pipelines.

SUPPORTED_STAGES = {'$addFields', '$bucket', '$bucketAuto', '$count', '$densify', '$facet', '$fill', '$graphLookup', '$group', '$limit', '$lookup', '$match', '$merge', '$project', '$redact', '$replaceRoot', '$replaceWith', '$sample', '$setWindowFields', '$skip', '$sort', '$unionWith', '$unset', '$unwind'}
UNSUPPORTED_STAGES = {'$indexStats', '$jsonSchema', '$out'}
UNSUPPORTED_EXPRESSIONS = {'$accumulator', '$function', '$jsonSchema', '$script'}
__init__(collection, expr_evaluator: ExprEvaluator | None = None, translation_cache_size: int | None = 100)[source]

Initialize the SQL tier aggregator.

_get_json_extract(path: str | None = None) str[source]

Get JSON extract function with correct prefix.

_get_json_set() str[source]

Get JSON set function with correct prefix.

can_optimize_pipeline(pipeline: list[dict[str, Any]]) bool[source]

Check if pipeline can be optimized in SQL tier.

_can_optimize_stage_expressions(stage: dict[str, Any]) bool[source]

Check if all expressions in a stage can be optimized in SQL.

_check_expression_support(obj: Any) bool[source]

Recursively check if an object contains unsupported expressions.

build_pipeline_sql(pipeline: list[dict[str, Any]]) tuple[str | None, list[Any]][source]

Build optimized SQL query for entire pipeline using CTEs.

_build_sql_template(pipeline: list[dict[str, Any]]) tuple[str | None, list[Any]][source]

Build SQL template and return (template, params).

_extract_param_values(pipeline: list[dict[str, Any]], param_names: tuple[str, ...]) list[Any][source]

Extract actual parameter values from pipeline for given field paths.

_get_placeholder_values(pipeline: list[dict[str, Any]]) dict[str, Any][source]

Extract values for placeholder parameters from pipeline.

_extract_literal_values_from_expression(expr: Any, values: dict[str, Any], placeholder_idx: int) int[source]

Extract literal values from an expression and store them in values dict.

_extract_param_names_from_pipeline(pipeline: list[dict[str, Any]]) list[str][source]

Extract parameter names from pipeline structure directly.

This is more robust than parsing SQL template since it works directly with the MongoDB pipeline specification.

_extract_params_from_expression(expr: Any, params: list[str], placeholder_idx: int) int[source]

Extract field references and literal values from an expression.

For expressions like {“$multiply”: [“$salary”, 0.1]}: - Field references ($salary) are extracted directly - Literal values (0.1) are stored as placeholders

Returns the updated placeholder_idx.

_extract_field_paths_from_dict(d: dict, prefix: str = '$') list[str][source]

Recursively extract all field paths from a dict.

_get_value_at_path(pipeline: list[dict[str, Any]], field_path: str) Any[source]

Get value from pipeline at given field path.

_find_value_in_dict(d: dict, field_path: str) Any[source]

Recursively find value in dict matching field path.

Also handles comparison operators by extracting the actual operand.

_extract_comparison_value(d: dict) Any[source]

Extract actual value from comparison operator dict.

Handles operators like {“$gt”: 25}, {“$gte”: 10}, {“$in”: [1,2,3]}, etc. Returns the actual operand value, not the operator dict.

get_cache_stats() dict[str, Any][source]

Get pipeline cache statistics.

clear_cache() None[source]

Clear the pipeline cache.

dump_cache() list[dict][source]

Dump all cache entries for debugging.

cache_contains(pipeline: list[dict]) bool[source]

Check if pipeline is in cache.

evict_from_cache(pipeline: list[dict]) bool[source]

Evict a specific pipeline from cache.

cache_size() int[source]

Get current cache size.

is_cache_enabled() bool[source]

Check if cache is enabled.

resize_cache(new_size: int) None[source]

Resize the cache.

_pipeline_needs_root(pipeline: list[dict[str, Any]]) bool[source]

Check if pipeline uses $$ROOT variable.

_stage_uses_root(stage: dict[str, Any]) bool[source]

Check if a stage uses $$ROOT variable.

_expression_uses_root(obj: Any) bool[source]

Recursively check if expression uses $$ROOT.

_build_stage_sql(stage_name: str, stage_spec: Any, prev_stage: str, context: PipelineContext, preserve_root: bool = False) tuple[str | None, list[Any]][source]

Build SQL for a single pipeline stage.

_build_addfields_sql(spec, prev_stage, context, preserve_root)[source]
_build_project_sql(spec, prev_stage, context, preserve_root)[source]
_build_unset_sql(spec, prev_stage, context, preserve_root)[source]
_build_replace_root_sql(spec, prev_stage, context, preserve_root)[source]
_build_sample_sql(spec, prev_stage, context)[source]
_build_bucket_sql(spec, prev_stage, context)[source]
_build_bucket_auto_sql(spec, prev_stage, context)[source]
_build_union_with_sql(spec, prev_stage, context)[source]
_build_redact_sql(spec, prev_stage, context)[source]

Build SQL for $redact stage. Optimizes KEEP/PRUNE patterns into a WHERE clause. Falls back to Python for DESCEND pattern as it requires recursive JSON modification.

_build_lookup_sql(spec, prev_stage, context)[source]
_build_merge_sql(spec, prev_stage, context)[source]
_build_densify_sql(spec, prev_stage, context)[source]

Build SQL for $densify stage - fills gaps in numeric sequences.

Algorithm: 1. Extract existing distinct values from the field into CTE “existing” 2. Calculate min/max bounds from existing values in CTE “bounds_calc” 3. Generate series of values from min to max with given step using

CROSS JOIN with a numbers table (0-50) in CTE “series”

  1. UNION ALL: - Original documents from prev_stage - New documents with densified field values (where value not in existing)

  2. Use NOT EXISTS to filter out values that already exist

Example: field=[1,3,5], step=1 produces [1,2,3,4,5] - existing: {1,3,5} - series: {1,2,3,4,5} - result: original docs + new docs for {2,4}

Limitations: - Only supports numeric fields (int/float) - Does not support partitionBy/partitionByFields (requires group-by) - Does not support date fields or granularity - Max gap filling limited to 1000 values (falls back to Python for larger)

_build_group_sql(spec, prev_stage, context)[source]
_map_accumulator_to_sql(op)[source]
_build_match_sql(spec, prev_stage, context)[source]
_build_sort_sql(spec, prev_stage, context)[source]
_build_skip_sql(spec, prev_stage, context)[source]
_build_limit_sql(spec, prev_stage, context)[source]
_build_count_sql(spec, prev_stage, context)[source]
_build_set_window_fields_sql(spec, prev_stage, context, preserve_root)[source]
_map_window_op_to_sql(op)[source]
_build_graph_lookup_sql(spec, prev_stage, context, preserve_root)[source]
_build_fill_sql(spec, prev_stage, context, preserve_root)[source]
_build_passthrough_sql(prev_stage, context)[source]