"""
$expr operator evaluator for NeoSQLite.
This module implements the MongoDB $expr operator using a 3-tier approach:
1. Single SQL Query (fastest) - Uses SQLite JSON functions
2. Temporary Tables (intermediate) - For complex expressions
3. Python Fallback (slowest but complete) - Always available, especially for kill switch
The evaluator ensures that SQLite and Python implementations produce identical results.
MongoDB $expr Compatibility:
- Comparison operators: $eq, $ne, $gt, $gte, $lt, $lte, $cmp
- Logical operators: $and, $or, $not, $nor
- Arithmetic operators: $add, $subtract, $multiply, $divide, $mod, $abs, $ceil, $floor, $round, $trunc
- Conditional operators: $cond, $ifNull, $switch
- Array operators: $size, $in, $arrayElemAt, $first, $last, $isArray
- Array aggregation: $sum, $avg, $min, $max
- Array transformation: $filter, $map, $reduce
- String operators: $concat, $toLower, $toUpper, $strLenBytes, $substr, $trim
- String regex: $regexMatch, $regexFind, $regexFindAll
- Date operators: $year, $month, $dayOfMonth, $hour, $minute, $second, $dayOfWeek, $dayOfYear
- Date arithmetic: $dateAdd, $dateSubtract, $dateDiff
- Type operators: $type, $convert, $toString, $toInt, $toDouble, $toBool
- Object operators: $mergeObjects, $getField, $setField
- Other: $literal, $let
- Trigonometric: $sin, $cos, $tan, $asin, $acos, $atan, $atan2
- Hyperbolic: $sinh, $cosh, $tanh, $asinh, $acosh, $atanh
- Logarithmic: $ln, $log, $log10, $log2
- Exponential/Sigmoid: $exp, $sigmoid
- Angle conversion: $degreesToRadians, $radiansToDegrees
Note: NeoSQLite extends MongoDB with $log2 (base-2 log) operator.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from ..json_path_utils import (
build_json_extract_expression as build_json_extract_expression,
)
from ..json_path_utils import (
parse_json_path as parse_json_path,
)
from ..jsonb_support import (
_get_json_each_function as _get_json_each_function,
)
from ..jsonb_support import (
_get_json_function_prefix as _get_json_function_prefix,
)
from ..jsonb_support import (
_get_json_group_array_function as _get_json_group_array_function,
)
from ..jsonb_support import (
supports_jsonb as supports_jsonb,
)
from ..jsonb_support import (
supports_jsonb_each as supports_jsonb_each,
)
# Import from submodules
from .constants import (
REMOVE_SENTINEL as REMOVE_SENTINEL,
)
from .constants import (
RESERVED_FIELDS as RESERVED_FIELDS,
)
from .constants import (
_RemoveSentinel as _RemoveSentinel,
)
from .context import (
AggregationContext as AggregationContext,
)
from .context import (
_is_aggregation_variable as _is_aggregation_variable,
)
from .context import (
_is_expression as _is_expression,
)
from .context import (
_is_field_reference as _is_field_reference,
)
from .context import (
_is_literal as _is_literal,
)
from .python_evaluators import PythonEvaluatorsMixin
from .sql_converters import SqlConvertersMixin
from .type_utils import (
_convert_to_bindata as _convert_to_bindata,
)
from .type_utils import (
_convert_to_bool as _convert_to_bool,
)
from .type_utils import (
_convert_to_bsonbindata as _convert_to_bsonbindata,
)
from .type_utils import (
_convert_to_bsonregex as _convert_to_bsonregex,
)
from .type_utils import (
_convert_to_date as _convert_to_date,
)
from .type_utils import (
_convert_to_decimal as _convert_to_decimal,
)
from .type_utils import (
_convert_to_double as _convert_to_double,
)
from .type_utils import (
_convert_to_int as _convert_to_int,
)
from .type_utils import (
_convert_to_long as _convert_to_long,
)
from .type_utils import (
_convert_to_null as _convert_to_null,
)
from .type_utils import (
_convert_to_objectid as _convert_to_objectid,
)
from .type_utils import (
_convert_to_regex as _convert_to_regex,
)
from .type_utils import (
_convert_to_string as _convert_to_string,
)
from .type_utils import (
get_bson_type as get_bson_type,
)
# Forward reference for PipelineContext to avoid circular import
if TYPE_CHECKING:
from ..sql_tier_aggregator import PipelineContext
# Public API exports
__all__ = [
# Classes
"ExprEvaluator",
"AggregationContext",
# Constants
"RESERVED_FIELDS",
"REMOVE_SENTINEL",
"_RemoveSentinel",
# Helper functions
"_is_expression",
"_is_field_reference",
"_is_aggregation_variable",
"_is_literal",
# Type conversion utilities
"_convert_to_int",
"_convert_to_long",
"_convert_to_double",
"_convert_to_decimal",
"_convert_to_string",
"_convert_to_bool",
"_convert_to_objectid",
"_convert_to_bindata",
"_convert_to_bsonbindata",
"_convert_to_regex",
"_convert_to_bsonregex",
"_convert_to_date",
"_convert_to_null",
"get_bson_type",
# JSON utilities
"parse_json_path",
"build_json_extract_expression",
# JSONB support
"supports_jsonb",
"supports_jsonb_each",
"_get_json_function_prefix",
"_get_json_each_function",
"_get_json_group_array_function",
]
[docs]
class ExprEvaluator(SqlConvertersMixin, PythonEvaluatorsMixin):
"""
Evaluator for MongoDB $expr operator.
Supports 3-tier evaluation:
- Tier 1: Direct SQL WHERE clause using JSON functions
- Tier 2: Temporary tables for complex expressions
- Tier 3: Python fallback (always available for kill switch)
JSON/JSONB Support:
- Automatically uses jsonb_* functions when supported for better performance
- Falls back to json_* functions when JSONB is not available
- Detects SQLite 3.51.0+ features (jsonb_each, jsonb_tree) for maximum performance
"""
[docs]
def __init__(self, data_column: str = "data", db_connection=None):
"""
Initialize the expression evaluator.
Args:
data_column: Name of the column containing JSON data (default: "data")
db_connection: Optional SQLite database connection for JSONB detection.
If provided, JSONB support will be auto-detected.
If None, json_* functions will be used (safe fallback).
"""
self.data_column = data_column
self._jsonb_supported = False
self._jsonb_each_supported = False
self._log2_warned = False # Track if we've warned about $log2
self._current_context = None # Temporary context for SQL conversion
if db_connection is not None:
self._jsonb_supported = supports_jsonb(db_connection)
self._jsonb_each_supported = supports_jsonb_each(db_connection)
@property
def json_function_prefix(self) -> str:
"""Get the appropriate JSON function prefix (json or jsonb)."""
return _get_json_function_prefix(self._jsonb_supported)
@property
def json_each_function(self) -> str:
"""Get the appropriate json_each function name (json_each or jsonb_each)."""
return _get_json_each_function(
self._jsonb_supported, self._jsonb_each_supported
)
@property
def json_group_array_function(self) -> str:
"""Get the appropriate json_group_array function name."""
return _get_json_group_array_function(self._jsonb_supported)
[docs]
def evaluate(
self, expr: dict[str, Any], tier: int = 1, force_python: bool = False
) -> tuple[str | None, list[Any]]:
"""
Evaluate a $expr expression.
Args:
expr: The $expr expression dictionary
tier: Complexity tier (1=SQL, 2=TempTable, 3=Python)
force_python: Force Python evaluation (kill switch)
Returns:
Tuple of (SQL WHERE clause, parameters) or (None, []) for Python evaluation
"""
if force_python or tier >= 3:
return None, []
match tier:
case 1:
return self._evaluate_sql_tier1(expr)
case 2:
return self._evaluate_sql_tier2(expr)
case _:
return None, []
[docs]
def _evaluate_sql_tier1(
self, expr: dict[str, Any]
) -> tuple[str | None, list[Any]]:
"""
Tier 1: Convert simple expressions to SQL WHERE clauses using JSON functions.
Supports basic operators and field comparisons.
"""
try:
sql_expr, params = self._convert_expr_to_sql(expr)
if sql_expr is None:
return None, []
return f"({sql_expr})", params
except (NotImplementedError, ValueError):
return None, []
[docs]
def _evaluate_sql_tier2(
self, expr: dict[str, Any]
) -> tuple[str | None, list[Any]]:
"""
Tier 2: Use temporary tables for complex expressions.
This tier is used when:
- Expressions are too complex for Tier 1 (single SQL WHERE clause)
- Multiple intermediate calculations are needed
- The expression can benefit from pre-computed field extractions
Args:
expr: The $expr expression
Returns:
Tuple of (SQL expression, parameters) or (None, []) for Python fallback
"""
# Tier 2 requires database connection which is passed from query_helper
# For now, this is a placeholder that will be called from query_helper
# with the proper database connection
return None, []
[docs]
def evaluate_for_aggregation(
self,
expr: Any,
context: AggregationContext | None = None,
as_alias: str | None = None,
) -> tuple[str, list[Any]]:
"""
Evaluate expression for aggregation pipeline.
This method evaluates expressions for use in aggregation pipeline stages
like $addFields, $project, $group, etc. Unlike the evaluate() method which
generates WHERE clause expressions, this method generates SELECT clause
expressions with optional aliases.
Args:
expr: Expression to evaluate. Can be:
- Dict: Expression like {"$sin": "$angle"}
- Str: Field reference like "$field" or variable like "$$ROOT"
- Literal: Number, string, boolean, None, array, or dict
context: Aggregation context for variable scoping. If None, a new
context will be created.
as_alias: Optional alias for SELECT clause (e.g., "AS field_name")
Returns:
Tuple of (SQL expression, parameters). The SQL expression will include
the alias if as_alias is provided.
Raises:
NotImplementedError: If the expression operator is not supported in
SQL tier for aggregation context
Examples:
>>> evaluator = ExprEvaluator()
>>> evaluator.evaluate_for_aggregation({"$sin": "$angle"})
("sin(json_extract(data, '$.angle'))", [])
>>> evaluator.evaluate_for_aggregation({"$sin": "$angle"}, as_alias="sin_val")
("sin(json_extract(data, '$.angle')) AS sin_val", [])
>>> evaluator.evaluate_for_aggregation("$field")
("json_extract(data, '$.field')", [])
>>> evaluator.evaluate_for_aggregation(42)
("?", [42])
"""
if context is None:
context = AggregationContext()
# Set temporary context for SQL conversion
old_context = self._current_context
self._current_context = context
try:
sql, params = self._convert_operand_to_sql_agg(expr, context)
if as_alias:
sql = f"{sql} AS {as_alias}"
return sql, params
finally:
# Restore previous context
self._current_context = old_context
[docs]
def _convert_operand_to_sql_agg(
self, operand: Any, context: AggregationContext
) -> tuple[str, list[Any]]:
"""
Convert an operand to SQL for aggregation context.
This method handles different types of operands:
- Expressions: Recursively evaluate using _convert_expr_to_sql
- Field references: Convert to json_extract calls
- Aggregation variables: Handle $$ROOT, $$CURRENT, etc.
- Literals: Convert to parameterized SQL
Args:
operand: The operand to convert
context: Aggregation context for variable scoping
Returns:
Tuple of (SQL expression, parameters)
Raises:
NotImplementedError: If the operand type is not supported
"""
# Handle aggregation variables first ($$ROOT, $$CURRENT, etc.)
if _is_aggregation_variable(operand):
return self._handle_aggregation_variable(operand, context)
# For other types, use the existing _convert_operand_to_sql method
# which handles expressions, field references, and literals
return self._convert_operand_to_sql(operand)
[docs]
def _handle_aggregation_variable(
self, var_name: str, context: AggregationContext
) -> tuple[str, list[Any]]:
"""
Handle aggregation variable references.
Args:
var_name: Variable name (e.g., "$$ROOT", "$$CURRENT")
context: Aggregation context
Returns:
Tuple of (SQL expression, parameters)
Raises:
NotImplementedError: If the variable is not supported
"""
match var_name:
case "$$ROOT":
# Return the entire document as JSON
# In aggregation context, this is the data column itself
return self.data_column, []
case "$$CURRENT":
# Return the current document state
# For SQL tier, this is the same as $$ROOT since we don't
# modify documents in-place in SQL
return self.data_column, []
case "$$REMOVE":
# Sentinel value for field removal in $project
# This is handled at the application level, not SQL
# Return a special marker that can be detected
raise NotImplementedError(
"$$REMOVE is handled at the application level (use REMOVE_SENTINEL)"
)
case _:
# Check for custom variables defined via $let
if (
var_name in context.variables
and context.variables[var_name] is not None
):
var_val = context.variables[var_name]
# If it's already a Tuple(sql, params), return it
if (
isinstance(var_val, tuple)
and len(var_val) == 2
and isinstance(var_val[0], str)
):
return var_val
# Unknown variable
raise NotImplementedError(
f"Aggregation variable {var_name} not supported in SQL tier"
)
[docs]
def build_select_expression(
self,
expr: Any,
alias: str | None = None,
context: AggregationContext | None = None,
) -> tuple[str, list[Any]]:
"""
Build SELECT clause expression for aggregation (SQL Tier 1 optimized).
This method is similar to evaluate_for_aggregation() but optimized for
SQL tier usage. It handles field aliasing and context tracking for
multi-stage pipelines.
Args:
expr: Expression to evaluate. Can be:
- Dict: Expression like {"$sin": "$angle"}
- Str: Field reference like "$field" or variable like "$$ROOT"
- Literal: Number, string, boolean, None, array, or dict
alias: Optional alias for SELECT clause (e.g., "AS field_name")
context: Aggregation context for variable scoping. If None, creates
a new context. This allows tracking computed fields across
pipeline stages.
Returns:
Tuple of (SQL expression, parameters). The SQL expression will include
the alias if alias is provided.
Raises:
NotImplementedError: If the expression operator is not supported in
SQL tier for aggregation context
Examples:
>>> evaluator = ExprEvaluator()
>>> evaluator.build_select_expression({"$sin": "$angle"})
("sin(json_extract(data, '$.angle'))", [])
>>> evaluator.build_select_expression({"$sin": "$angle"}, alias="sin_val")
("sin(json_extract(data, '$.angle')) AS sin_val", [])
>>> evaluator.build_select_expression("$field")
("json_extract(data, '$.field')", [])
>>> evaluator.build_select_expression(42)
("?", [42])
"""
if context is None:
context = AggregationContext()
# Set temporary context for SQL conversion
old_context = self._current_context
self._current_context = context
try:
sql, params = self._convert_operand_to_sql_agg(expr, context)
if alias:
sql = f"{sql} AS {alias}"
return sql, params
finally:
# Restore previous context
self._current_context = old_context
[docs]
def build_group_by_expression(
self,
expr: Any,
context: AggregationContext | None = None,
) -> tuple[str, list[Any]]:
"""
Build GROUP BY clause expression for aggregation (SQL Tier 1 optimized).
This method is optimized for grouping operations. It's similar to
build_select_expression() but doesn't include aliases since GROUP BY
clauses reference expressions directly.
Args:
expr: Expression to evaluate for GROUP BY
context: Aggregation context for variable scoping
Returns:
Tuple of (SQL expression, parameters)
Examples:
>>> evaluator.build_group_by_expression("$category")
("json_extract(data, '$.category')", [])
>>> evaluator.build_group_by_expression({"$toLower": "$category"})
("lower(json_extract(data, '$.category'))", [])
"""
if context is None:
context = AggregationContext()
# For GROUP BY, we just need the expression without alias
return self._convert_operand_to_sql_agg(expr, context)
[docs]
def build_having_expression(
self,
expr: Any,
context: AggregationContext | None = None,
) -> tuple[str, list[Any]]:
"""
Build HAVING clause expression for post-aggregation filtering.
HAVING expressions are evaluated after aggregation, so they can
reference aggregate results.
Args:
expr: Expression to evaluate for HAVING clause
context: Aggregation context for variable scoping
Returns:
Tuple of (SQL expression, parameters)
Examples:
>>> evaluator.build_having_expression({"$gt": ["$total", 100]})
("(json_extract(data, '$.total') > ?)", [100])
"""
if context is None:
context = AggregationContext()
# HAVING expressions are similar to WHERE expressions
# Use _convert_expr_to_sql for boolean expressions
if isinstance(expr, dict) and len(expr) == 1:
key = next(iter(expr.keys()))
if key.startswith("$"):
return self._convert_expr_to_sql(expr)
# For non-expression operands, convert normally
return self._convert_operand_to_sql_agg(expr, context)
[docs]
def _handle_aggregation_variable_sql_tier(
self, var_name: str, context: PipelineContext | None = None
) -> str:
"""
Handle aggregation variable references for SQL tier.
This is a SQL-tier-specific version that works with PipelineContext
instead of AggregationContext.
Args:
var_name: Variable name (e.g., "$$ROOT", "$$CURRENT")
context: Pipeline context for tracking fields
Returns:
SQL expression string
Raises:
NotImplementedError: If the variable is not supported
"""
match var_name:
case "$$ROOT":
# In SQL tier, root_data is preserved in a separate column
return "root_data"
case "$$CURRENT":
# Current document state is in the data column
return "data"
case "$$REMOVE":
# Sentinel for field removal - handled at application level
return "$$REMOVE"
case _:
raise NotImplementedError(
f"Aggregation variable {var_name} not supported in SQL tier"
)