neosqlite.collection.sql_tier_aggregator module¶
SQL Tier 1 Optimizer for Aggregation Pipelines.
This module implements SQL-based optimization for aggregation pipelines, providing 10-100x performance improvements over Python fallback (Tier 3).
The optimizer analyzes aggregation pipelines and generates optimized SQL queries using CTEs (Common Table Expressions) for multi-stage pipelines.
- class neosqlite.collection.sql_tier_aggregator.PipelineContext[source]¶
Bases:
objectTracks field aliases, computed fields, and document state across pipeline stages.
- clone() PipelineContext[source]¶
Create a copy of this context.
- class neosqlite.collection.sql_tier_aggregator.SQLTierAggregator(collection, expr_evaluator: ExprEvaluator | None = None, translation_cache_size: int | None = 100)[source]¶
Bases:
objectSQL Tier 1 optimizer for aggregation pipelines.
- SUPPORTED_STAGES = {'$addFields', '$bucket', '$bucketAuto', '$count', '$densify', '$facet', '$fill', '$graphLookup', '$group', '$limit', '$lookup', '$match', '$merge', '$project', '$redact', '$replaceRoot', '$replaceWith', '$sample', '$setWindowFields', '$skip', '$sort', '$unionWith', '$unset', '$unwind'}¶
- UNSUPPORTED_STAGES = {'$indexStats', '$jsonSchema', '$out'}¶
- UNSUPPORTED_EXPRESSIONS = {'$accumulator', '$function', '$jsonSchema', '$script'}¶
- __init__(collection, expr_evaluator: ExprEvaluator | None = None, translation_cache_size: int | None = 100)[source]¶
Initialize the SQL tier aggregator.
- _get_json_extract(path: str | None = None) str[source]¶
Get JSON extract function with correct prefix.
- can_optimize_pipeline(pipeline: list[dict[str, Any]]) bool[source]¶
Check if pipeline can be optimized in SQL tier.
- _can_optimize_stage_expressions(stage: dict[str, Any]) bool[source]¶
Check if all expressions in a stage can be optimized in SQL.
- _check_expression_support(obj: Any) bool[source]¶
Recursively check if an object contains unsupported expressions.
- build_pipeline_sql(pipeline: list[dict[str, Any]]) tuple[str | None, list[Any]][source]¶
Build optimized SQL query for entire pipeline using CTEs.
- _build_sql_template(pipeline: list[dict[str, Any]]) tuple[str | None, list[Any]][source]¶
Build SQL template and return (template, params).
- _extract_param_values(pipeline: list[dict[str, Any]], param_names: tuple[str, ...]) list[Any][source]¶
Extract actual parameter values from pipeline for given field paths.
- _get_placeholder_values(pipeline: list[dict[str, Any]]) dict[str, Any][source]¶
Extract values for placeholder parameters from pipeline.
- _extract_literal_values_from_expression(expr: Any, values: dict[str, Any], placeholder_idx: int) int[source]¶
Extract literal values from an expression and store them in values dict.
- _extract_param_names_from_pipeline(pipeline: list[dict[str, Any]]) list[str][source]¶
Extract parameter names from pipeline structure directly.
This is more robust than parsing SQL template since it works directly with the MongoDB pipeline specification.
- _extract_params_from_expression(expr: Any, params: list[str], placeholder_idx: int) int[source]¶
Extract field references and literal values from an expression.
For expressions like {“$multiply”: [“$salary”, 0.1]}: - Field references ($salary) are extracted directly - Literal values (0.1) are stored as placeholders
Returns the updated placeholder_idx.
- _extract_field_paths_from_dict(d: dict, prefix: str = '$') list[str][source]¶
Recursively extract all field paths from a dict.
- _get_value_at_path(pipeline: list[dict[str, Any]], field_path: str) Any[source]¶
Get value from pipeline at given field path.
- _find_value_in_dict(d: dict, field_path: str) Any[source]¶
Recursively find value in dict matching field path.
Also handles comparison operators by extracting the actual operand.
- _extract_comparison_value(d: dict) Any[source]¶
Extract actual value from comparison operator dict.
Handles operators like {“$gt”: 25}, {“$gte”: 10}, {“$in”: [1,2,3]}, etc. Returns the actual operand value, not the operator dict.
- _pipeline_needs_root(pipeline: list[dict[str, Any]]) bool[source]¶
Check if pipeline uses $$ROOT variable.
- _build_stage_sql(stage_name: str, stage_spec: Any, prev_stage: str, context: PipelineContext, preserve_root: bool = False) tuple[str | None, list[Any]][source]¶
Build SQL for a single pipeline stage.
- _build_redact_sql(spec, prev_stage, context)[source]¶
Build SQL for $redact stage. Optimizes KEEP/PRUNE patterns into a WHERE clause. Falls back to Python for DESCEND pattern as it requires recursive JSON modification.
- _build_densify_sql(spec, prev_stage, context)[source]¶
Build SQL for $densify stage - fills gaps in numeric sequences.
Algorithm: 1. Extract existing distinct values from the field into CTE “existing” 2. Calculate min/max bounds from existing values in CTE “bounds_calc” 3. Generate series of values from min to max with given step using
CROSS JOIN with a numbers table (0-50) in CTE “series”
UNION ALL: - Original documents from prev_stage - New documents with densified field values (where value not in existing)
Use NOT EXISTS to filter out values that already exist
Example: field=[1,3,5], step=1 produces [1,2,3,4,5] - existing: {1,3,5} - series: {1,2,3,4,5} - result: original docs + new docs for {2,4}
Limitations: - Only supports numeric fields (int/float) - Does not support partitionBy/partitionByFields (requires group-by) - Does not support date fields or granularity - Max gap filling limited to 1000 values (falls back to Python for larger)